Visual Builder Integration - Technical Documentation
**Phase:** 207 - Visual Builder Integration
**Status:** COMPLETE ✅
**Last Updated:** 2026-03-28
Table of Contents
- Overview
- Architecture
- Template to ReactFlow Conversion
- Node Types
- Variable Substitution
- API Endpoints
- Tenant Isolation
- Execution Flow
- Error Handling
- Testing
- Future Enhancements
---
Overview
Phase 207 Objectives
Phase 207 connects the ReactFlow visual builder to the enhanced workflow system built in Phases 201-206. The visual builder enables users to create, customize, and execute workflows through a drag-and-drop interface without writing code.
**Key Goals:**
- Load workflow templates from WorkflowTemplateManager into visual builder
- Populate integration nodes from 39+ capability-discovered integrations
- Support entity nodes for CRUD operations
- Support agent nodes with maturity-based governance
- Execute workflows via UnifiedActionExecutor
- Provide variable substitution UI
- Enable "Save as Template" functionality
Integration with Phases 201-206
The visual builder sits on top of the workflow enhancement foundation:
- **Phase 201 (Integration Registry):** Provides capability-discovered integrations for dynamic node population
- **Phase 202 (Integration Action Enhancement):** UnifiedActionExecutor dispatches workflow steps
- **Phase 203 (Outflow Wrappers):** Async execution layer for native integrations
- **Phase 204 (Agent Step Integration):** AI agents trigger integrations with governance
- **Phase 205 (Integration Wrappers):** Zoom and Notion wrapper implementations
- **Phase 206 (Workflow Templates):** Template catalog and storage infrastructure
- **Phase 207 (Visual Builder):** UI layer for template browsing, customization, and execution
Architecture Diagram
┌─────────────────────────────────────────────────────────────────┐
│ Frontend Layer │
├─────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Template │ │ Workflow │ │ Node │ │
│ │ Browser │→ │ Builder │→ │ Config │ │
│ │ (207-01) │ │ (207-02) │ │ Sidebar │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ↓ ↓ ↓ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Integration │ │ Variable │ │ Template │ │
│ │ Node │ │ Substitution │ │ Save │ │
│ │ Selector │ │ Panel │ │ (207-08) │ │
│ │ (207-03) │ │ (207-07) │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓ HTTPS
┌─────────────────────────────────────────────────────────────────┐
│ API Layer │
├─────────────────────────────────────────────────────────────────┤
│ GET /api/workflows-templates (Template listing) │
│ GET /api/workflows-templates/{id} (Template fetch) │
│ GET /api/integrations (Capability discovery) │
│ GET /api/entities/types (Entity types) │
│ GET /api/agents (Agent listing) │
│ POST /api/workflows/execute (Workflow execution) │
│ POST /api/workflows/save-template (Save template) │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ Backend Services Layer │
├─────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Workflow │ │ Integration │ │ Entity │ │
│ │ Template │ │ Registry │ │ Skill │ │
│ │ Manager │ │ (201-02) │ │ Executor │ │
│ │ (206-01) │ │ │ │ (204-03) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ↓ ↓ ↓ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Unified │ │ Agent │ │ Outflow │ │
│ │ Action │ │ Governance │ │ Wrapper │ │
│ │ Executor │ │ Service │ │ Factory │ │
│ │ (202-01) │ │ (204-01) │ │ (203-01) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ Data Layer │
├─────────────────────────────────────────────────────────────────┤
│ workflow_templates/ (File-based template storage) │
│ TenantIntegration (Tenant integration configurations) │
│ OAuthToken (OAuth credentials per tenant) │
│ Entity (Custom entity types) │
│ Agent (AI agents with maturity levels) │
└─────────────────────────────────────────────────────────────────┘Key Components and Their Responsibilities
Frontend Components
| Component | File | Responsibility |
|---|---|---|
| **WorkflowBuilder** | src/components/Automations/WorkflowBuilder.tsx | Main orchestrator for visual editor, manages ReactFlow state, handles template loading, node configuration, workflow execution |
| **TemplateBrowser** | src/components/Automations/TemplateBrowser.tsx | Template discovery UI with category filtering, search, tenant-scoped browsing, template cards with metadata |
| **IntegrationNodeSelector** | src/components/Automations/IntegrationNodeSelector.tsx | Integration node creation from capability-discovered integrations, connection status display, operation configuration |
| **EntityNodeSelector** | src/components/Automations/EntityNodeSelector.tsx | Entity node creation for CRUD operations, entity type selection, maturity-based permission display |
| **AgentNodeSelector** | src/components/Automations/AgentNodeSelector.tsx | Agent node creation, agent selection, maturity level display, governance restrictions |
| **VariableSubstitutionPanel** | src/components/Automations/VariableSubstitutionPanel.tsx | Variable input UI, variable extraction and validation, real-time preview of resolved values |
Backend Services
| Service | File | Responsibility |
|---|---|---|
| **WorkflowTemplateManager** | backend-saas/core/workflow_template_system.py | Template storage and retrieval, file-based loading from workflow_templates/, template validation, template instantiation |
| **IntegrationRegistry** | backend-saas/core/integration_registry.py | Dynamic service loading, capability discovery, credential resolution, per-tenant service caching |
| **UnifiedActionExecutor** | backend-saas/core/unified_action_executor.py | Workflow step dispatch, governance checks, quota enforcement, integration routing, error handling |
| **EntitySkillExecutor** | backend-saas/core/entity_skill_executor.py | Entity CRUD operations, maturity-based permissions, entity type validation |
| **AgentGovernanceService** | backend-saas/core/agent_governance.py | Agent maturity checks, action authorization, privilege escalation prevention |
Utilities
| Utility | File | Responsibility |
|---|---|---|
| **template-loader.ts** | src/lib/workflow/template-loader.ts | Template to ReactFlow conversion, tree layout algorithm, node positioning, edge generation from dependencies |
| **variables.ts** | src/lib/workflow/variables.ts | Variable extraction from templates, variable source identification, validation |
| **executor.ts** | src/lib/workflow/executor.ts | ReactFlow nodes to workflow steps conversion, execution API client, result formatting |
---
Architecture
Frontend Components
WorkflowBuilder (Orchestrator)
**File:** src/components/Automations/WorkflowBuilder.tsx
**Responsibilities:**
- Manage ReactFlow state (nodes, edges, selection)
- Handle template loading via
initialTemplateIdprop - Provide undo/redo functionality via
useUndoRedohook - Execute workflows via WorkflowExecutor client
- Save workflows as templates
- Display optimization suggestions and variable panel
**Key Props:**
interface WorkflowBuilderProps {
onSave?: (data: { nodes: Node[]; edges: Edge[] }) => void;
initialData?: { nodes: Node[]; edges: Edge[] };
initialTemplateId?: string; // Load template on mount
}**State Management:**
const [nodes, setNodes, onNodesChange] = useNodesState(initialNodes);
const [edges, setEdges, onEdgesChange] = useEdgesState(initialEdges);
const [variableValues, setVariableValues] = useState<Record<string, any>>({});
const [templateInputs, setTemplateInputs] = useState<TemplateInput[]>([]);**Key Functions:**
handleLoadTemplate(templateId)- Fetch template and convert to ReactFlowhandleExecuteWorkflow()- Convert nodes to steps and executehandleSaveAsTemplate()- Save current workflow as templatehandleOptimize()- Generate optimization suggestions
TemplateBrowser (Template Discovery)
**File:** src/components/Automations/TemplateBrowser.tsx
**Responsibilities:**
- Fetch and display templates from backend
- Filter by category (All, Integration, Automation, Approval)
- Search templates by name and description (debounced 300ms)
- Display template metadata (complexity, steps, tags)
- Tenant-scoped browsing (public + own templates)
**Props:**
interface TemplateBrowserProps {
onLoadTemplate: (templateId: string) => void;
onClose?: () => void;
}**API Integration:**
const fetchTemplates = async (category?: string, search?: string) => {
const params = new URLSearchParams();
if (category && category !== "All") params.append("category", category);
if (search) params.append("search", search);
const response = await fetch(`/api/workflows-templates?${params}`);
const data = await response.json();
return data.templates;
};**Template Card Display:**
- Template name and description
- Complexity badge (color-coded)
- Step count
- Category tag
- Public/private indicator
- Tags list
IntegrationNodeSelector (Integration Nodes)
**File:** src/components/Automations/IntegrationNodeSelector.tsx
**Responsibilities:**
- Fetch available integrations from IntegrationRegistry
- Display integration connection status
- List operations for each integration
- Create integration nodes with operation configuration
**Node Data Structure:**
{
id: string;
type: "integration";
position: {
x: number;
y: number;
}
data: {
label: string;
connector_id: string; // e.g., 'slack', 'salesforce'
operation: string; // e.g., 'send_message', 'create_lead'
parameters: Array<{ name: string; value: any }>;
connection_status: "connected" | "not_connected";
}
}**Connection Status Display:**
- Green badge: Connected
- Red badge: Not connected
- Click to connect OAuth
EntityNodeSelector (Entity Nodes)
**File:** src/components/Automations/EntityNodeSelector.tsx
**Responsibilities:**
- Fetch entity types from backend
- Display entity maturity levels with color coding
- Create entity nodes for CRUD operations
**Node Data Structure:**
{
id: string;
type: "entity";
position: {
x: number;
y: number;
}
data: {
label: string;
entity_type: string; // e.g., 'Lead', 'Task', 'Deal'
operation: "create" | "read" | "update" | "delete" | "query";
parameters: Record<string, any>;
maturity_level: "student" | "intern" | "supervised" | "autonomous";
}
}**Maturity Color Coding:**
- Student: Gray (read-only)
- Intern: Blue (proposals)
- Supervised: Yellow (monitoring)
- Autonomous: Green (full access)
AgentNodeSelector (Agent Nodes)
**File:** src/components/Automations/AgentNodeSelector.tsx
**Responsibilities:**
- Fetch agents from backend
- Display agent maturity levels
- Show governance restrictions
- Create agent nodes
**Node Data Structure:**
{
id: string;
type: 'ai_node';
position: { x: number; y: number };
data: {
label: string;
agent_id: string;
agent_role: string;
maturity_level: 'student' | 'intern' | 'supervised' | 'autonomous';
prompt: string;
governance_restrictions: string[];
};
}**Governance Display:**
- Student: "Read-only access"
- Intern: "Approval required for actions"
- Supervised: "Live monitoring"
- Autonomous: "Full access"
VariableSubstitutionPanel (Variables)
**File:** src/components/Automations/VariableSubstitutionPanel.tsx
**Responsibilities:**
- Extract variables from template inputs and workflow steps
- Display variable input fields
- Validate variable values
- Preview resolved values
**Variable Types:**
${input.field_name}- Template input variables${step_N.field_name}- Step output variables (N = step index)${step_step_id.field_name}- Step output variables (step_id = step identifier)
---
Backend Services
WorkflowTemplateManager (Template Storage)
**File:** backend-saas/core/workflow_template_system.py
**Responsibilities:**
- Load templates from
workflow_templates/directory - Validate template schema
- Instantiate templates with input values
- Save custom templates
- Reload templates without application restart
**Key Methods:**
class WorkflowTemplateManager:
def load_templates(self) -> None:
"""Load all templates from workflow_templates/ directory."""
def get_template(self, template_id: str) -> Optional[WorkflowTemplate]:
"""Get template by ID."""
def instantiate_template(self, template_id: str, inputs: Dict[str, Any]) -> WorkflowInstance:
"""Instantiate template with input values."""
def save_template(self, template: WorkflowTemplate, tenant_id: str) -> str:
"""Save template to workflow_templates/ directory."""
def reload_templates(self) -> int:
"""Reload all templates from disk."""**Template Storage:**
- File-based JSON storage in
workflow_templates/ - Subdirectories:
integration/,automation/,approval/ - Naming convention:
{category}-{name}-{version}.json - Git version control
IntegrationRegistry (Capability Discovery)
**File:** backend-saas/core/integration_registry.py
**Responsibilities:**
- Dynamically load integration services
- Discover available operations
- Resolve credentials per tenant
- Cache service instances per tenant
**Key Methods:**
class IntegrationRegistry:
def get_service(self, tenant_id: str, connector_id: str) -> IntegrationService:
"""Get integration service instance for tenant."""
def discover_capabilities(self, connector_id: str) -> Dict[str, Any]:
"""Discover available operations for integration."""
def resolve_credentials(self, tenant_id: str, connector_id: str) -> Dict[str, Any]:
"""Resolve credentials for integration."""**Capability Discovery:**
- Reflect on integration service methods
- Extract operation signatures
- Return parameter schemas
- Cache results per connector_id
UnifiedActionExecutor (Workflow Execution)
**File:** backend-saas/core/unified_action_executor.py
**Responsibilities:**
- Dispatch workflow steps to appropriate executors
- Check governance before execution
- Enforce quota limits
- Handle errors and retries
- Log execution results
**Key Methods:**
class UnifiedActionExecutor:
async def dispatch(self, tenant_id: str, step: WorkflowStep, context: Dict[str, Any]) -> ActionResult:
"""Dispatch workflow step to appropriate executor."""
async def dispatch_integration_action(self, tenant_id: str, connector_id: str, action: str, params: Dict[str, Any]) -> ActionResult:
"""Dispatch integration action via IntegrationRegistry."""
async def dispatch_entity_operation(self, tenant_id: str, entity_type: str, operation: str, params: Dict[str, Any]) -> ActionResult:
"""Dispatch entity operation via EntitySkillExecutor."""
async def dispatch_agent_step(self, tenant_id: str, agent_id: str, step: WorkflowStep, context: Dict[str, Any]) -> ActionResult:
"""Dispatch agent step via reasoning engine."""**Execution Flow:**
- Validate step structure
- Check governance (AgentGovernanceService)
- Check quota (QuotaManagerService)
- Route to appropriate executor
- Execute with retry logic
- Log action result
- Return output
EntitySkillExecutor (Entity Operations)
**File:** backend-saas/core/entity_skill_executor.py
**Responsibilities:**
- Execute entity CRUD operations
- Validate entity types
- Enforce maturity-based permissions
- Query entities with filters
**Key Methods:**
class EntitySkillExecutor:
async def create_entity(self, tenant_id: str, entity_type: str, data: Dict[str, Any], maturity_level: str) -> Entity:
"""Create entity instance."""
async def read_entity(self, tenant_id: str, entity_type: str, entity_id: str, maturity_level: str) -> Entity:
"""Read entity instance."""
async def update_entity(self, tenant_id: str, entity_type: str, entity_id: str, data: Dict[str, Any], maturity_level: str) -> Entity:
"""Update entity instance."""
async def delete_entity(self, tenant_id: str, entity_type: str, entity_id: str, maturity_level: str) -> None:
"""Delete entity instance."""
async def query_entities(self, tenant_id: str, entity_type: str, filters: Dict[str, Any], maturity_level: str) -> List[Entity]:
"""Query entities with filters."""**Maturity Permissions:**
- Student: Read only
- Intern: Read + create (with approval)
- Supervised: Read + create + update (with monitoring)
- Autonomous: Full CRUD access
---
Data Flow: Template → ReactFlow → Execution → Results
1. Template Loading Flow
User clicks template in TemplateBrowser
↓
TemplateBrowser.onLoadTemplate(templateId)
↓
WorkflowBuilder.handleLoadTemplate(templateId)
↓
GET /api/workflows-templates/{templateId}
↓
WorkflowTemplateManager.get_template(templateId)
↓
Template returned as JSON
↓
templateToReactFlow(template) in template-loader.ts
↓
Generate ReactFlow nodes and edges
↓
setNodes(), setEdges() in WorkflowBuilder
↓
Template displayed on canvas2. Workflow Execution Flow
User clicks "Execute Workflow" button
↓
WorkflowBuilder.handleExecuteWorkflow()
↓
nodesToWorkflowSteps(nodes) in executor.ts
↓
POST /api/workflows/execute
↓
UnifiedActionExecutor.dispatch() for each step
↓
Governance check (AgentGovernanceService)
↓
Quota check (QuotaManagerService)
↓
Route to executor (Integration/Entity/Agent)
↓
Execute with retry logic
↓
Return results
↓
formatExecutionResults(results)
↓
Display results in UI3. Save as Template Flow
User clicks "Save as Template" button
↓
WorkflowBuilder.handleSaveAsTemplate()
↓
Prompt for template metadata (name, description, category)
↓
reactFlowToTemplate(nodes, edges, metadata)
↓
POST /api/workflows/save-template
↓
WorkflowTemplateManager.save_template()
↓
Validate template schema
↓
Save to workflow_templates/{category}-{name}-{version}.json
↓
Return success message---
Template to ReactFlow Conversion
templateToReactFlow() Algorithm
**File:** src/lib/workflow/template-loader.ts
**Function Signature:**
export function templateToReactFlow(template: WorkflowTemplate): {
nodes: Node[];
edges: Edge[];
complexity: TemplateComplexity;
};**Algorithm Steps:**
- **Validate Template**
- Check required fields (template_id, name, inputs, steps)
- Validate step structure
- Return error if invalid
- **Convert Steps to Nodes**
- Map each step to a ReactFlow node
- Apply node type mapping (see table below)
- Extract node data from step definition
- Calculate node position (see positioning algorithm)
- **Generate Edges from Dependencies**
- For each step, iterate through
depends_onarray - Create edge from dependency step to current step
- Use
addStepEdgetype for custom styling - Set edge label (optional)
- **Calculate Complexity Metrics**
- Count total steps
- Calculate max depth (longest dependency chain)
- Calculate branching factor (average dependencies per step)
- **Return Nodes, Edges, Complexity**
- Nodes array with positions
- Edges array with connections
- Complexity object with metrics
Node Type Mapping Table
| Template Step Type | ReactFlow Node Type | Description |
|---|---|---|
trigger | trigger | Webhook or schedule trigger |
agent_execution | ai_node | AI agent execution step |
integration_action | action | Integration operation |
conditional_logic | condition | Conditional branching |
entity_operation | entity | Entity CRUD operation |
loop | loop | Loop iteration |
**Example Mapping:**
const nodeTypeMap: Record<string, string> = {
trigger: "trigger",
agent_execution: "ai_node",
integration_action: "action",
conditional_logic: "condition",
entity_operation: "entity",
loop: "loop",
};
const nodeType = nodeTypeMap[step.step_type] || "action";Edge Generation from Dependencies
**Algorithm:**
function generateEdges(steps: WorkflowStep[]): Edge[] {
const edges: Edge[] = [];
const stepIndexMap = new Map<string, number>();
// Build step index map
steps.forEach((step, index) => {
stepIndexMap.set(step.step_id, index);
});
// Generate edges from dependencies
steps.forEach((step, index) => {
if (step.depends_on) {
step.depends_on.forEach((depId) => {
const depIndex = stepIndexMap.get(depId);
if (depIndex !== undefined) {
edges.push({
id: `e${depIndex}-${index}`,
source: depIndex.toString(),
target: index.toString(),
type: "addStepEdge",
});
}
});
}
});
return edges;
}**Edge Structure:**
interface Edge {
id: string;
source: string; // Source node ID
target: string; // Target node ID
type?: string; // Edge type (default, addStepEdge, etc.)
label?: string; // Optional label
animated?: boolean; // Animation flag
style?: React.CSSProperties; // Custom styles
}Position Calculation Logic
**Tree Layout Algorithm (DFS-based):**
function generateNodePositions(
steps: WorkflowStep[],
): Map<string, { x: number; y: number }> {
const positions = new Map<string, { x: number; y: number }>();
const visited = new Set<string>();
const depths = new Map<string, number>();
// Calculate depths using DFS
function calculateDepth(stepId: string, depth: number): void {
depths.set(stepId, depth);
const step = steps.find((s) => s.step_id === stepId);
if (step?.depends_on) {
step.depends_on.forEach((depId) => {
if (!visited.has(depId)) {
visited.add(depId);
calculateDepth(depId, depth + 1);
}
});
}
}
// Start from root nodes (no dependencies)
steps.forEach((step) => {
if (!step.depends_on || step.depends_on.length === 0) {
calculateDepth(step.step_id, 0);
}
});
// Group steps by depth
const depthGroups = new Map<number, string[]>();
depths.forEach((depth, stepId) => {
if (!depthGroups.has(depth)) {
depthGroups.set(depth, []);
}
depthGroups.get(depth)!.push(stepId);
});
// Calculate positions
const HORIZONTAL_SPACING = 300;
const VERTICAL_SPACING = 150;
depthGroups.forEach((stepIds, depth) => {
const y = depth * VERTICAL_SPACING;
const width = (stepIds.length - 1) * HORIZONTAL_SPACING;
const startX = -width / 2;
stepIds.forEach((stepId, index) => {
const x = startX + index * HORIZONTAL_SPACING;
positions.set(stepId, { x, y });
});
});
return positions;
}**Positioning Strategy:**
- Calculate depth of each step using DFS traversal
- Group steps by depth level
- Center each level horizontally
- Apply constant vertical spacing between levels
- Apply constant horizontal spacing between nodes at same level
---
Node Types
IntegrationNode: Capability-Discovered Operations
**Purpose:** Represent integration operations dynamically discovered from IntegrationRegistry.
**Node Data Structure:**
interface IntegrationNodeData {
label: string; // Display name (e.g., "Send Slack Message")
connector_id: string; // Integration identifier (e.g., "slack")
operation: string; // Operation name (e.g., "send_message")
parameters: Array<{
name: string; // Parameter name
value: any; // Parameter value (can include variable references)
type: string; // Parameter type (string, number, boolean, etc.)
required: boolean; // Whether parameter is required
}>;
connection_status: "connected" | "not_connected"; // OAuth connection status
}**Features:**
- Dynamic operation list from IntegrationRegistry
- Connection status indicator (green = connected, red = not connected)
- Parameter configuration with variable substitution
- OAuth credential resolution per tenant
**Example:**
{
id: 'node-1',
type: 'integration',
position: { x: 100, y: 100 },
data: {
label: 'Send Slack Message',
connector_id: 'slack',
operation: 'send_message',
parameters: [
{ name: 'channel', value: '${input.slack_channel}', type: 'string', required: true },
{ name: 'message', value: '${step_1.result}', type: 'string', required: true },
],
connection_status: 'connected',
},
}EntityNode: CRUD Operations with Maturity Colors
**Purpose:** Represent entity CRUD operations with maturity-based permission display.
**Node Data Structure:**
interface EntityNodeData {
label: string; // Display name (e.g., "Create Lead")
entity_type: string; // Entity type name (e.g., "Lead", "Task", "Deal")
operation: "create" | "read" | "update" | "delete" | "query"; // CRUD operation
parameters: Record<string, any>; // Entity data
maturity_level: "student" | "intern" | "supervised" | "autonomous"; // Required maturity
maturity_color: string; // Color code based on maturity
}**Maturity Color Coding:**
- **Student (Gray):** Read-only operations
- **Intern (Blue):** Create operations with approval
- **Supervised (Yellow):** Update operations with monitoring
- **Autonomous (Green):** Full CRUD access
**Features:**
- Entity type selection from tenant's custom entities
- Operation selection based on maturity permissions
- Color-coded maturity display
- Parameter validation against entity schema
**Example:**
{
id: 'node-2',
type: 'entity',
position: { x: 100, y: 300 },
data: {
label: 'Create Lead',
entity_type: 'Lead',
operation: 'create',
parameters: {
first_name: '${input.first_name}',
last_name: '${input.last_name}',
email: '${input.email}',
status: 'new',
},
maturity_level: 'intern',
maturity_color: '#3b82f6', // Blue
},
}AgentNode: Maturity-Based Governance Display
**Purpose:** Represent AI agent execution steps with governance restrictions.
**Node Data Structure:**
interface AgentNodeData {
label: string; // Display name (e.g., "Analyze Lead")
agent_id: string; // Agent identifier
agent_role: string; // Agent role (e.g., "Sales Assistant")
maturity_level: "student" | "intern" | "supervised" | "autonomous"; // Agent maturity
prompt: string; // Task prompt for agent
governance_restrictions: string[]; // List of governance restrictions
can_execute: boolean; // Whether agent can execute operation
}**Governance Restrictions by Maturity:**
- **Student:** Read-only operations, no actions
- **Intern:** Approval required for actions, limited operations
- **Supervised:** Monitoring required, can execute most operations
- **Autonomous:** Full access, can supervise other agents
**Features:**
- Agent selection from tenant's agents
- Maturity level display with restrictions
- Governance restriction list
- Permission validation before execution
**Example:**
{
id: 'node-3',
type: 'ai_node',
position: { x: 100, y: 500 },
data: {
label: 'Analyze Lead',
agent_id: 'agent-123',
agent_role: 'Sales Assistant',
maturity_level: 'intern',
prompt: 'Analyze this lead and determine if it qualifies for follow-up',
governance_restrictions: [
'Approval required for actions',
'Cannot delete records',
'Limited to read operations',
],
can_execute: true,
},
}TriggerNode: Workflow Triggers
**Purpose:** Represent workflow trigger events (webhooks, schedules).
**Node Data Structure:**
interface TriggerNodeData {
label: string; // Display name
trigger_type: "webhook" | "schedule" | "manual"; // Trigger type
webhook_url?: string; // Webhook URL (if webhook trigger)
schedule?: string; // Cron expression (if schedule trigger)
schema?: any; // Input schema for webhook payload
}**Trigger Types:**
- **Webhook:** HTTP endpoint trigger
- **Schedule:** Cron-based trigger
- **Manual:** Manual execution trigger
**Example:**
{
id: 'node-0',
type: 'trigger',
position: { x: 100, y: 0 },
data: {
label: 'Webhook Start',
trigger_type: 'webhook',
webhook_url: 'https://atomagentos.com/api/webhooks/wh-123',
schema: {
type: 'object',
properties: {
userId: { type: 'string' },
action: { type: 'string' },
},
},
},
}ActionNode: Generic Action Nodes
**Purpose:** Represent generic workflow actions (email, delay, log, etc.).
**Node Data Structure:**
interface ActionNodeData {
label: string; // Display name
action_type: string; // Action type
parameters: Record<string, any>; // Action parameters
}**Example:**
{
id: 'node-4',
type: 'action',
position: { x: 100, y: 700 },
data: {
label: 'Send Email',
action_type: 'send_email',
parameters: {
to: '${input.email}',
subject: 'Workflow Complete',
body: 'Your workflow has completed successfully.',
},
},
}ConditionNode: Conditional Branching
**Purpose:** Represent conditional logic for workflow branching.
**Node Data Structure:**
interface ConditionNodeData {
label: string; // Display name
condition: string; // Condition expression
true_label: string; // Label for true branch
false_label: string; // Label for false branch
}**Example:**
{
id: 'node-5',
type: 'condition',
position: { x: 100, y: 400 },
data: {
label: 'Is Lead Qualified?',
condition: '${step_1.score} > 80',
true_label: 'Yes',
false_label: 'No',
},
}---
Variable Substitution
${input.field} and ${step_N.field} Patterns
Variable substitution allows dynamic parameter values in workflow templates.
**Supported Patterns:**
- **Input Variables:**
${input.field_name}
- Reference template input values
- Evaluated at workflow start
- User-provided values
- **Step Output Variables:**
${step_N.field_name}
- Reference outputs from previous steps
- N is the step index (0-based)
- Evaluated during workflow execution
- **Step ID Variables:**
${step_step_id.field_name}
- Reference outputs using step identifier
- More readable than numeric index
- Evaluated during workflow execution
**Example:**
{
"step_id": "send_notification",
"parameters": [
{
"name": "message",
"default_value": "Workflow '${input.workflow_name}' completed with status: ${step_1.status}"
}
]
}Variable Extraction Algorithm
**File:** src/lib/workflow/variables.ts
**Function:**
export function extractVariables(template: WorkflowTemplate): {
inputs: TemplateInput[];
variables: VariableReference[];
};**Algorithm Steps:**
- **Extract Input Variables**
- Parse
template.inputsarray - Extract input name, type, default value
- Return as
TemplateInput[]
- **Extract Variable References**
- Scan all step parameters
- Find
${...}patterns using regex - Parse variable source and field path
- Return as
VariableReference[]
- **Validate Variable References**
- Check input variables exist in template.inputs
- Check step references exist in template.steps
- Return validation errors
**Regex Pattern:**
const VARIABLE_PATTERN = /\$\{(?:input|step_\d+|step_[\w-]+)\.[\w.]+\}/g;Substitution at Execution Time
**Backend Substitution (Python):**
**File:** backend-saas/core/unified_action_executor.py
**Function:**
def substitute_variables(value: str, inputs: Dict[str, Any], context: Dict[str, Any]) -> Any:
"""Substitute variables in parameter value."""
if not isinstance(value, str):
return value
# Replace ${input.field} variables
value = re.sub(r'\$\{input\.([\w.]+)\}', lambda m: str(inputs.get(m.group(1), '')), value)
# Replace ${step_N.field} variables
value = re.sub(r'\$\{step_(\d+)\.([\w.]+)\}', lambda m: str(context.get(int(m.group(1)), {}).get(m.group(2), '')), value)
# Replace ${step_step_id.field} variables
value = re.sub(r'\$\{step_([\w-]+)\.([\w.]+)\}', lambda m: str(context.get(m.group(1), {}).get(m.group(2), '')), value)
return value**Execution Flow:**
- Collect template input values from user
- Execute steps in dependency order
- For each step, substitute variables in parameters
- Use resolved values for step execution
- Store step output in context for next steps
Validation Rules
**Variable Syntax Validation:**
- **Source Validation**
- Source must be
inputor start withstep_ - Invalid sources:
${foo.bar},${123.field}
- **Field Path Validation**
- Field path must be specified after dot
- Invalid patterns:
${input},${step_1}
- **Step Reference Validation**
- Step index must be numeric or valid step_id
- Invalid patterns:
${step_abc.field},${step_.field}
- **Input Reference Validation**
- Input field must exist in template.inputs
- Error if input not defined
- **Step Output Validation**
- Referenced step must exist in template.steps
- Error if step not found
**Validation Errors:**
interface ValidationError {
type:
| "invalid_source"
| "invalid_field_path"
| "invalid_step_reference"
| "input_not_found"
| "step_not_found";
message: string;
variable: string;
location: string; // Step ID or parameter name
}---
API Endpoints
GET /api/workflows-templates (Template Listing)
**Purpose:** List workflow templates with filtering.
**Request:**
GET /api/workflows-templates?category=integration&search=salesforce
Authorization: Bearer {session_token}**Query Parameters:**
category(optional): Filter by category (integration,automation,approval)search(optional): Search in name and description
**Response:**
{
"templates": [
{
"template_id": "salesforce-slack-sync-1.0.0",
"name": "Salesforce to Slack Sync",
"description": "Sync new Salesforce leads to Slack channel",
"category": "integration",
"complexity": "intermediate",
"inputs": [
{
"name": "slack_channel",
"label": "Slack Channel",
"type": "string",
"required": true,
"default_value": "#sales"
}
],
"steps": [
{
"step_id": "query_leads",
"name": "Query Salesforce Leads",
"step_type": "integration_action",
"service": "salesforce",
"action": "query_leads",
"parameters": [{ "name": "limit", "default_value": "10" }]
},
{
"step_id": "send_slack",
"name": "Send to Slack",
"step_type": "integration_action",
"service": "slack",
"action": "send_message",
"depends_on": ["query_leads"],
"parameters": [
{ "name": "channel", "default_value": "${input.slack_channel}" },
{
"name": "message",
"default_value": "New leads: ${step_query_leads.count}"
}
]
}
],
"is_public": true,
"tenant_id": null,
"tags": ["salesforce", "slack", "sync"]
}
],
"total": 1
}**Backend Implementation:**
@router.get("/workflows-templates")
async def browse_templates(
category: Optional[str] = None,
search: Optional[str] = None,
session: Session = Depends(get_db),
tenant: Tenant = Depends(get_tenant)
):
manager = WorkflowTemplateManager(session)
templates = manager.browse_templates(
tenant_id=tenant.id,
category=category,
search=search
)
return {"templates": templates, "total": len(templates)}GET /api/workflows-templates/{templateId} (Template Fetch)
**Purpose:** Fetch single template by ID.
**Request:**
GET /api/workflows-templates/salesforce-slack-sync-1.0.0
Authorization: Bearer {session_token}**Response:** Same as template object in listing above.
GET /api/integrations (Capability Discovery)
**Purpose:** Discover available integrations and operations.
**Request:**
GET /api/integrations
Authorization: Bearer {session_token}**Response:**
{
"integrations": [
{
"connector_id": "slack",
"name": "Slack",
"description": "Send messages and channels",
"icon": "slack-icon.png",
"connection_status": "connected",
"operations": [
{
"name": "send_message",
"description": "Send message to channel",
"parameters": [
{ "name": "channel", "type": "string", "required": true },
{ "name": "message", "type": "string", "required": true }
]
}
]
}
]
}**Backend Implementation:**
@router.get("/integrations")
async def discover_integrations(
session: Session = Depends(get_db),
tenant: Tenant = Depends(get_tenant)
):
registry = IntegrationRegistry(session)
integrations = registry.discover_all(tenant_id=tenant.id)
return {"integrations": integrations}GET /api/entities/types (Entity Types)
**Purpose:** List custom entity types for tenant.
**Request:**
GET /api/entities/types
Authorization: Bearer {session_token}**Response:**
{
"entity_types": [
{
"name": "Lead",
"schema": {
"first_name": { "type": "string" },
"last_name": { "type": "string" },
"email": { "type": "string" },
"status": {
"type": "string",
"enum": ["new", "contacted", "qualified"]
}
},
"maturity_level": "intern"
}
]
}GET /api/agents (Agent Listing)
**Purpose:** List agents for tenant.
**Request:**
GET /api/agents
Authorization: Bearer {session_token}**Response:**
{
"agents": [
{
"id": "agent-123",
"name": "Sales Assistant",
"role": "Sales Assistant",
"maturity_level": "intern",
"capabilities": ["analyze_leads", "send_followup"]
}
]
}POST /api/workflows/execute (Workflow Execution)
**Purpose:** Execute workflow from visual builder.
**Request:**
POST /api/workflows/execute
Authorization: Bearer {session_token}
Content-Type: application/json
{
"steps": [
{
"step_id": "query_leads",
"step_type": "integration_action",
"connector_id": "salesforce",
"operation": "query_leads",
"parameters": { "limit": 10 }
},
{
"step_id": "send_slack",
"step_type": "integration_action",
"connector_id": "slack",
"operation": "send_message",
"depends_on": ["query_leads"],
"parameters": {
"channel": "${input.slack_channel}",
"message": "New leads: ${step_query_leads.count}"
}
}
],
"inputs": {
"slack_channel": "#sales"
}
}**Response:**
{
"execution_id": "exec-456",
"status": "completed",
"results": [
{
"step_id": "query_leads",
"status": "success",
"output": { "count": 5, "leads": [...] }
},
{
"step_id": "send_slack",
"status": "success",
"output": { "message_ts": "123456.789" }
}
],
"errors": []
}**Error Response:**
{
"execution_id": "exec-456",
"status": "failed",
"results": [],
"errors": [
{
"step_id": "send_slack",
"error": "Integration not connected",
"code": "INTEGRATION_NOT_CONNECTED"
}
]
}**Backend Implementation:**
@router.post("/workflows/execute")
async def execute_workflow(
request: WorkflowExecutionRequest,
session: Session = Depends(get_db),
tenant: Tenant = Depends(get_tenant)
):
executor = UnifiedActionExecutor(session)
execution_id = str(uuid.uuid4())
results = []
errors = []
context = {}
for step in request.steps:
try:
result = await executor.dispatch(
tenant_id=tenant.id,
step=step,
context=context
)
results.append(result)
context[step.step_id] = result.output
except Exception as e:
errors.append({
"step_id": step.step_id,
"error": str(e),
"code": getattr(e, "code", "UNKNOWN_ERROR")
})
return {
"execution_id": execution_id,
"status": "completed" if not errors else "failed",
"results": results,
"errors": errors
}POST /api/workflows/save-template (Save Template)
**Purpose:** Save workflow as template.
**Request:**
POST /api/workflows/save-template
Authorization: Bearer {session_token}
Content-Type: application/json
{
"name": "My Custom Workflow",
"description": "A custom workflow for my use case",
"category": "automation",
"complexity": "intermediate",
"steps": [...],
"is_public": false
}**Response:**
{
"template_id": "automation-my-custom-workflow-1.0.0",
"status": "saved",
"message": "Template saved successfully"
}**Backend Implementation:**
@router.post("/workflows/save-template")
async def save_template(
template_data: WorkflowTemplateCreate,
session: Session = Depends(get_db),
tenant: Tenant = Depends(get_tenant)
):
manager = WorkflowTemplateManager(session)
template_id = await manager.save_template(
template=template_data,
tenant_id=tenant.id
)
return {
"template_id": template_id,
"status": "saved",
"message": "Template saved successfully"
}---
Tenant Isolation
Template Scoping (Public + Tenant Templates)
**Rule:** Users see only:
- Public templates (
is_public=true) - Their own templates (
tenant_id matches)
**Implementation:**
def browse_templates(tenant_id: str, category: Optional[str] = None) -> List[WorkflowTemplate]:
query = self.session.query(WorkflowTemplate).filter(
or_(
WorkflowTemplate.is_public == True,
WorkflowTemplate.tenant_id == tenant_id
)
)
if category:
query = query.filter(WorkflowTemplate.category == category)
return query.all()**Security Benefit:** Prevents cross-tenant template leakage.
Integration Credential Filtering
**Rule:** Integration queries filter by tenant_id.
**Implementation:**
def get_service(self, tenant_id: str, connector_id: str) -> IntegrationService:
# Fetch tenant's integration configuration
tenant_integration = self.session.query(TenantIntegration).filter(
TenantIntegration.tenant_id == tenant_id,
TenantIntegration.connector_id == connector_id
).first()
if not tenant_integration:
raise IntegrationNotConfigured()
# Resolve credentials for tenant
credentials = self.resolve_credentials(tenant_id, connector_id)
# Load service with credentials
service = self._load_service(connector_id)
service.configure(credentials)
return service**Security Benefit:** Prevents cross-tenant credential access.
Entity Type Filtering
**Rule:** Entity queries filter by tenant_id.
**Implementation:**
async def query_entities(self, tenant_id: str, entity_type: str, filters: Dict[str, Any]) -> List[Entity]:
# Validate entity type exists for tenant
entity_def = self.session.query(EntityDefinition).filter(
EntityDefinition.tenant_id == tenant_id,
EntityDefinition.name == entity_type
).first()
if not entity_def:
raise EntityNotFound()
# Query entities with tenant filter
query = self.session.query(Entity).filter(
Entity.tenant_id == tenant_id,
Entity.entity_type == entity_type
)
# Apply filters
for key, value in filters.items():
query = query.filter(getattr(Entity, key) == value)
return query.all()**Security Benefit:** Prevents cross-tenant entity data access.
Agent Filtering
**Rule:** Agent queries filter by tenant_id.
**Implementation:**
async def get_agents(self, tenant_id: str) -> List[Agent]:
agents = self.session.query(Agent).filter(
Agent.tenant_id == tenant_id
).all()
return [
{
"id": agent.id,
"name": agent.name,
"role": agent.role,
"maturity_level": agent.maturity_level,
"capabilities": agent.capabilities
}
for agent in agents
]**Security Benefit:** Prevents cross-tenant agent access.
---
Execution Flow
Frontend: nodesToWorkflowSteps() Conversion
**File:** src/lib/workflow/executor.ts
**Function:**
export function nodesToWorkflowSteps(nodes: Node[]): WorkflowStep[] {
return nodes.map((node, index) => {
const step: WorkflowStep = {
step_id: node.id,
step_type: mapNodeTypeToStepType(node.type),
name: node.data.label,
};
// Add connector_id for integration nodes
if (node.type === "integration") {
step.connector_id = node.data.connector_id;
step.operation = node.data.operation;
step.parameters = node.data.parameters;
}
// Add entity_type for entity nodes
if (node.type === "entity") {
step.entity_type = node.data.entity_type;
step.operation = node.data.operation;
step.parameters = node.data.parameters;
}
// Add agent_id for agent nodes
if (node.type === "ai_node") {
step.agent_id = node.data.agent_id;
step.prompt = node.data.prompt;
}
// Add depends_on from edges
const incomingEdges = edges.filter((e) => e.target === node.id);
if (incomingEdges.length > 0) {
step.depends_on = incomingEdges.map((e) => e.source);
}
return step;
});
}**Node Type Mapping:**
function mapNodeTypeToStepType(nodeType: string): string {
const mapping = {
trigger: "trigger",
ai_node: "agent_execution",
integration: "integration_action",
entity: "entity_operation",
condition: "conditional_logic",
action: "action",
};
return mapping[nodeType] || "action";
}Backend: UnifiedActionExecutor.dispatch()
**File:** backend-saas/core/unified_action_executor.py
**Method:**
async def dispatch(self, tenant_id: str, step: WorkflowStep, context: Dict[str, Any]) -> ActionResult:
"""Dispatch workflow step to appropriate executor."""
# 1. Validate step structure
self._validate_step(step)
# 2. Check governance
if step.step_type == 'agent_execution':
decision = await self.governance_service.can_perform_action(
tenant_id=tenant_id,
agent_id=step.agent_id,
action=step.operation
)
if not decision.allowed:
raise GovernanceBlocked(decision.reason)
# 3. Check quota
await self.quota_service.check_quota(tenant_id, 'workflow_execution')
# 4. Route to executor
if step.step_type == 'integration_action':
result = await self.dispatch_integration_action(
tenant_id=tenant_id,
connector_id=step.connector_id,
action=step.operation,
params=step.parameters,
context=context
)
elif step.step_type == 'entity_operation':
result = await self.dispatch_entity_operation(
tenant_id=tenant_id,
entity_type=step.entity_type,
operation=step.operation,
params=step.parameters,
context=context
)
elif step.step_type == 'agent_execution':
result = await self.dispatch_agent_step(
tenant_id=tenant_id,
agent_id=step.agent_id,
step=step,
context=context
)
else:
raise UnknownStepType(step.step_type)
# 5. Log action
await self._log_action_result(tenant_id, step, result)
# 6. Return result
return resultGovernance Checks via AgentGovernanceService
**File:** backend-saas/core/agent_governance.py
**Method:**
async def can_perform_action(self, tenant_id: str, agent_id: str, action: str) -> GovernanceDecision:
"""Check if agent can perform action."""
# Get agent
agent = self.session.query(Agent).filter(
Agent.tenant_id == tenant_id,
Agent.id == agent_id
).first()
if not agent:
raise AgentNotFound()
# Check maturity-based permissions
if agent.maturity_level == 'student' and action in DESTRUCTIVE_ACTIONS:
return GovernanceDecision(
allowed=False,
reason="Student agents cannot perform destructive actions"
)
if agent.maturity_level == 'intern' and action in RESTRICTED_ACTIONS:
return GovernanceDecision(
allowed=False,
reason="Intern agents require approval for this action"
)
return GovernanceDecision(allowed=True)Integration Dispatch via IntegrationRegistry
**Method:**
async def dispatch_integration_action(
self,
tenant_id: str,
connector_id: str,
action: str,
params: Dict[str, Any],
context: Dict[str, Any]
) -> ActionResult:
"""Dispatch integration action."""
# Get service from registry
service = await self.registry.get_service(tenant_id, connector_id)
# Substitute variables in parameters
resolved_params = self._substitute_variables(params, context)
# Execute operation with retry
result = await self._execute_with_retry(
lambda: service.execute_operation(action, resolved_params)
)
return ActionResult(
status="success",
output=result
)Entity Operations via EntitySkillExecutor
**File:** backend-saas/core/entity_skill_executor.py
**Method:**
async def create_entity(self, tenant_id: str, entity_type: str, data: Dict[str, Any], maturity_level: str) -> Entity:
"""Create entity instance."""
# Validate maturity permissions
if maturity_level == 'student':
raise PermissionDenied("Student agents cannot create entities")
# Validate entity schema
entity_def = self._validate_entity_type(tenant_id, entity_type)
# Create entity
entity = Entity(
tenant_id=tenant_id,
entity_type=entity_type,
data=data
)
self.session.add(entity)
self.session.commit()
return entityResult Propagation Back to UI
**Frontend Display:**
const handleExecuteWorkflow = async () => {
const steps = nodesToWorkflowSteps(nodes);
const response = await fetch("/api/workflows/execute", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
steps,
inputs: variableValues,
}),
});
const result = await response.json();
if (result.status === "completed") {
toast({
title: "Workflow executed successfully",
description: `Completed ${result.results.length} steps`,
});
} else {
toast({
title: "Workflow execution failed",
description: result.errors.map((e) => e.error).join(", "),
variant: "destructive",
});
}
};---
Error Handling
Governance Blocks (403)
**Scenario:** Agent tries to perform action beyond maturity level.
**Error Response:**
{
"error": "Governance blocked",
"code": "GOVERNANCE_BLOCKED",
"message": "Student agents cannot perform destructive actions",
"step_id": "delete_record",
"status": 403
}**User-Facing Message:** "Your agent's maturity level does not permit this action. Please upgrade agent maturity or contact administrator."
Quota Exceeded (429)
**Scenario:** Tenant exceeds daily workflow execution quota.
**Error Response:**
{
"error": "Quota exceeded",
"code": "QUOTA_EXCEEDED",
"message": "Daily workflow execution quota exceeded (500/500)",
"quota_limit": 500,
"quota_used": 500,
"quota_reset": "2026-03-29T00:00:00Z",
"status": 429
}**User-Facing Message:** "You've reached your daily workflow execution limit. Upgrade your plan for more executions."
Integration Not Connected (400)
**Scenario:** User tries to execute integration action without connecting OAuth.
**Error Response:**
{
"error": "Integration not connected",
"code": "INTEGRATION_NOT_CONNECTED",
"message": "Slack integration not connected. Please connect your Slack account.",
"connector_id": "slack",
"connect_url": "https://atomagentos.com/api/integrations/slack/connect",
"status": 400
}**User-Facing Message:** "Please connect your Slack account before using this integration."
General Errors (500)
**Scenario:** Unexpected server error during workflow execution.
**Error Response:**
{
"error": "Internal server error",
"code": "INTERNAL_ERROR",
"message": "An unexpected error occurred. Please try again later.",
"request_id": "req-123456",
"status": 500
}**User-Facing Message:** "Something went wrong. Please try again or contact support."
User-Facing Error Messages
**Display Strategy:**
- Map error codes to user-friendly messages
- Provide actionable next steps
- Include links to relevant actions (e.g., connect integration)
- Show technical details only in development mode
**Implementation:**
const getErrorMessage = (error: any): string => {
const errorMessages = {
GOVERNANCE_BLOCKED:
"Your agent's maturity level does not permit this action.",
QUOTA_EXCEEDED: "You've reached your daily workflow execution limit.",
INTEGRATION_NOT_CONNECTED: `Please connect your ${error.connector_id} account.`,
INTERNAL_ERROR: "Something went wrong. Please try again later.",
};
return errorMessages[error.code] || error.message;
};---
Testing
Test File Locations
**Frontend Tests:**
src/components/Automations/__tests__/WorkflowBuilder.test.tsxsrc/components/Automations/__tests__/TemplateBrowser.test.tsxsrc/lib/workflow/__tests__/template-loader.test.tssrc/lib/workflow/__tests__/executor.test.ts
**Backend Tests:**
backend-saas/tests/integration/test_visual_builder.pybackend-saas/tests/integration/test_workflow_execution.pybackend-saas/tests/unit/test_template_to_reactflow.py
**E2E Tests:**
backend-saas/tests/e2e/test_visual_builder_e2e.py
Coverage Targets
**Frontend:**
- Component tests: 80% coverage
- Utility functions: 85% coverage
- Integration tests: 75% coverage
**Backend:**
- Unit tests: 80% coverage
- Integration tests: 75% coverage
- E2E tests: 70% coverage
**AI Systems:**
- Agent governance: 85% coverage
- Integration dispatch: 80% coverage
Tenant Isolation Test Patterns
**Pattern 1: Dual-Tenant Template Isolation**
def test_template_tenant_isolation():
# Create template for tenant A
template_a = create_template(tenant_id="tenant-a", is_public=False)
# Fetch templates for tenant B
templates_b = browse_templates(tenant_id="tenant-b")
# Assert tenant A's template not visible to tenant B
assert template_a.id not in [t.id for t in templates_b]**Pattern 2: Cross-Tenant Credential Prevention**
def test_credential_tenant_isolation():
# Configure Slack for tenant A
configure_integration(tenant_id="tenant-a", connector_id="slack", token="token-a")
# Try to execute integration as tenant B
with pytest.raises(IntegrationNotConfigured):
execute_integration(tenant_id="tenant-b", connector_id="slack")**Pattern 3: Entity Data Isolation**
def test_entity_tenant_isolation():
# Create entity for tenant A
entity_a = create_entity(tenant_id="tenant-a", entity_type="Lead", data={"name": "Lead A"})
# Query entities for tenant B
entities_b = query_entities(tenant_id="tenant-b", entity_type="Lead")
# Assert tenant A's entity not visible to tenant B
assert entity_a.id not in [e.id for e in entities_b]E2E Test Scenarios
**Scenario 1: Template Loading and Execution**
async def test_load_and_execute_template():
# 1. Browse templates
templates = await browse_templates(category="integration")
assert len(templates) > 0
# 2. Load template into builder
template = templates[0]
nodes, edges = template_to_reactflow(template)
# 3. Execute workflow
result = await execute_workflow(nodes, edges, inputs={})
assert result["status"] == "completed"**Scenario 2: Integration Node Creation**
async def test_create_integration_node():
# 1. Fetch integrations
integrations = await discover_integrations()
slack = next(i for i in integrations if i["connector_id"] == "slack")
# 2. Create integration node
node = create_integration_node(
connector_id="slack",
operation="send_message",
parameters={"channel": "#test", "message": "Hello"}
)
# 3. Verify node structure
assert node["type"] == "integration"
assert node["data"]["connector_id"] == "slack"
assert node["data"]["operation"] == "send_message"**Scenario 3: Variable Substitution**
async def test_variable_substitution():
# 1. Create template with variables
template = {
"inputs": [{"name": "name", "type": "string"}],
"steps": [
{
"step_id": "greet",
"operation": "send_message",
"parameters": {
"message": "Hello, ${input.name}!"
}
}
]
}
# 2. Execute with input
result = await execute_workflow(
template,
inputs={"name": "Alice"}
)
# 3. Verify variable substituted
assert result["steps"][0]["output"]["message"] == "Hello, Alice!"---
Future Enhancements
Sub-Workflows
**Goal:** Enable reusable workflow components that can be nested within larger workflows.
**Use Cases:**
- Common approval process reused across multiple workflows
- Standard data enrichment pipeline
- Shared error handling workflow
**Implementation:**
- Add
sub_workflownode type - Support workflow composition
- Pass context between parent and child workflows
- Visual indication of sub-workflow boundaries
Workflow Versioning
**Goal:** Track workflow changes over time with version history.
**Use Cases:**
- Rollback to previous workflow version
- Compare workflow versions
- Audit trail of workflow changes
**Implementation:**
- Store workflow versions in database
- Add version field to workflow metadata
- Version diff UI
- Rollback functionality
Template Marketplace
**Goal:** Community-driven template sharing and discovery.
**Use Cases:**
- Browse community-contributed templates
- Rate and review templates
- Submit templates to marketplace
- Featured templates and categories
**Implementation:**
- Public template repository
- Template rating and review system
- Submission and moderation workflow
- Marketplace UI with search and filtering
Collaborative Editing
**Goal:** Multiple users editing workflows simultaneously.
**Use Cases:**
- Team-based workflow building
- Real-time collaboration
- Conflict resolution
**Implementation:**
- WebSocket-based real-time sync
- Operational transformation for conflict resolution
- User presence indicators
- Edit history and attribution
---
**Documentation Version:** 1.0.0
**Last Updated:** 2026-03-28
**Maintained By:** Phase 207 Development Team